001    /*
002     * CondorGLSFDispatcher.java
003     *
004     * Created on July 17, 2003, 11:17 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.Dispatchers.condorg;
024    
025    import gov.bnl.star.offline.scheduler.*;
026    import gov.bnl.star.offline.scheduler.request.Request;
027    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication;
028    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFDispatcher;
029    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
030    import gov.bnl.star.offline.scheduler.util.FilesystemToolkit;
031    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; //Moved Statistics recording to Scheduler.java LH
032    import gov.bnl.star.offline.scheduler.util.GenericResourceRequirementStringDefinition;
033    
034    import java.io.File;
035    import java.io.FileOutputStream;
036    import java.io.PrintStream;
037    import java.util.*;
038    
039    import java.util.logging.Level;
040    import java.util.logging.Logger;
041    
042    
043    /** Dispatches jobs using Condor-G on a remote site that uses LSF. It will use some
044     * extra rsl attributes created to command some extra features such as mail
045     * notification, resource usage, job name and target machine. These extra LSF
046     * attribute require a patch to the LSF job manager.
047     * @author Gabriele Carcassi
048     * @version 1.0 2003/07/23
049     */
050    public class CondorGLSFDispatcher extends LSFDispatcher {
051        static private Logger log = Logger.getLogger(CondorGLSFDispatcher.class.getName());
052    
053        private static String condorEx;
054        protected CSHApplication application;
055        
056        private String ResReqDefinitionObj;
057        
058        public void setResourceRequirementStringDefinition(String ResReqDefinitionObj){
059            this.ResReqDefinitionObj = ResReqDefinitionObj;
060            
061        }
062    
063        public void setCondorEx(String condorEx) {
064            this.condorEx = condorEx;
065        }
066        
067        public String getCondorEx() {
068            return condorEx;
069        }
070    
071        /** Creates a new dispatcher */
072        public CondorGLSFDispatcher() {
073        }
074    
075        /** Creates the scripts and dispatches the job on the target machine.
076         * @param request the job request
077         */
078        public void dispatch(Request request, List jobs) {
079            log.info("Dispatching using Condor-g and LSF: \"" + request.getCommand() +
080                "\"");
081    
082            // Enables the simulation mode if necessary
083            useSimulationMode(request.getSimulation());
084            reportedFailure = false;
085    
086            // Submits from the higher to the lower JobID. This way the
087            // user has a feel of  when the last job is going to be
088            // submitted
089            for (int nProcess = jobs.size() - 1; nProcess >= 0;
090                    nProcess--) {
091                Job job = (Job) jobs.get(nProcess);
092    
093                System.out.print("Dispatching process " +
094                    job.getJobID() + ".");
095                dispatch(request, job);
096            }
097    
098            //StatisticsRecorder.getIntance().recordStatistics(request, jobs); //Moved Statistics recording to Scheduler.java LH
099        }
100        
101        
102        public void setApplication(CSHApplication application){
103            this.application = application;
104        }
105    
106    
107        
108        public CSHApplication getApplication(){
109            return application;
110        }
111        
112    
113        protected void dispatch(Request request, Job job) {
114            //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
115            
116            //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file
117            if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file
118                System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");  
119                String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary.";
120                log.warning(notSet);
121                System.out.println(notSet);
122                application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
123            }
124            
125    
126            // TODO: all the parameters should be passed in one go
127            application.setJob(request, job);
128            application.setScratchDir(scratchDir);
129            application.setSubmissionCommand(getCondorGCommand(request, job));
130    
131            application.prepareJob();
132            prepareClassAd(request, job);
133    
134            log.info("Executing \"" + getCondorGCommand(request, job) + "\"");
135    
136            if (!simulation) {
137                try {
138                    Thread.sleep(getMsBtwnSuccess());
139                } catch (Exception e) {
140                }
141    
142                long StarTime = System.currentTimeMillis();
143                int attempt = 0;
144                boolean success = false;
145    
146                while (!success && (attempt < getMaxAttempts())) {
147                    try {
148                        CSHCommandLineTask task = new CSHCommandLineTask(getCondorGCommand(request, job), true, 30000);
149                        task.execute();
150    
151                        if (task.getExitStatus() != 0) {
152                            log.warning("bsub failed: " + task.getOutput());
153                            Thread.sleep(getMsBtwnFailure());
154                            System.out.print("/");
155                            attempt++;
156                        } else {
157                            success = true;
158                            job.DispatchSuccessful();
159                            job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim());
160                            job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE)));
161                        }
162                    } catch (Exception e) {
163                        log.log(Level.SEVERE,
164                            "Couldn't submit the script to Condor-g", e);
165    
166                        try {
167                            Thread.sleep(getMsBtwnFailure());
168                        } catch (Exception e1) {
169                        }
170    
171                        System.out.print("/");
172                        attempt++;
173                    }
174                }
175    
176                if (success) {
177                    System.out.println(" done.");
178                } else {
179                    System.out.println(" FAILED!!");
180                }
181            } else {
182                System.out.println(" simulated.");
183            }
184        }
185    
186        /** Returns the command line to submit the job through condor-g.
187         * @param request the request that originated the job
188         * @param job the job to be dispatched
189         * @return the commandline to submit the job
190         */
191        protected String getCondorGCommand(Request request, Job job) {
192            return condorEx + " " + getClassAdName(request, job);
193        }
194    
195        /** Returns the name of the file containing the class ad. Class ad is the job
196         * description required by condor to submit a job.
197         * @param request the request that originated the job
198         * @param job the job to be submitted
199         * @return the file name of the class ad
200         */
201        protected String getClassAdName(Request request, Job job) {
202            return "sched" + job.getJobID() + ".condorg";
203        }
204    
205        private void prepareClassAd(Request request, Job job) {
206            try {
207                PrintStream classAd = new PrintStream(new FileOutputStream(
208                            new File(getClassAdName(request, job))));
209                createClassAd(request, job, classAd);
210            } catch (Exception e) {
211                log.log(Level.SEVERE, "Couldn't create the class ad", e);
212                throw new RuntimeException("Couldn't create the class ad " +
213                    getClassAdName(request, job) + ": " + e.getMessage());
214            }
215        }
216    
217        private void createClassAd(Request request, Job job,
218            PrintStream classAd) {
219            classAd.print("executable = ");
220            classAd.println(getExecutable());
221    
222            if (getArguments() != null) {
223                classAd.print("arguments = ");
224                classAd.println(getArguments());
225            }
226    
227            classAd.print("globusscheduler = ");
228            classAd.println(getGlobusScheduler());
229    
230            if (application.getStdin() != null) {
231                classAd.print("input = ");
232                classAd.println(application.getStdin());
233            }
234    
235            if (application.getStdout() != null) {
236                classAd.print("output = ");
237                classAd.println(application.getStdout());
238            }
239    
240            if (application.getStderr() != null) {
241                classAd.print("error = ");
242                classAd.println(application.getStderr());
243            }
244    
245            classAd.print("log = ");
246            classAd.println(getLogName(job));
247    
248            if (getRemoteDirectory() != null) {
249                classAd.print("remote_initialdir = ");
250                classAd.println(getRemoteDirectory());
251            }
252    
253            classAd.print("globusrsl =");
254    
255            if (job.getTarget() != null) {
256                classAd.print(" (xlsfmachine = ");
257                classAd.print(job.getTarget());
258                classAd.print(")");
259            }
260    
261            if (application.getJobName() != null) {
262                classAd.print(" (xlsfjobname = ");
263                classAd.print(application.getJobName());
264                classAd.print(")");
265            }
266    
267            if (request.getMail()) {
268                classAd.print(" (xlsfmailreport = ");
269                classAd.print("false");
270                classAd.print(")");
271            } else {
272                classAd.print(" (xlsfmailreport = ");
273                classAd.print("true");
274                classAd.print(")");
275            }
276            ////////////lbh
277            
278            GenericResourceRequirementStringDefinition lsfResReqDef = new GenericResourceRequirementStringDefinition();
279            if(ResReqDefinitionObj != null)
280            lsfResReqDef = (GenericResourceRequirementStringDefinition) ComponentLibrary.getInstance().getComponent(ResReqDefinitionObj);
281    
282            if ((getResourceUsageSwitch(job) != null)&&( lsfResReqDef.hasResourcesDefinition(job))) {
283                
284                String SD = "rusage" + getResourceUsageSwitch(job).subSequence(getResourceUsageSwitch(job).indexOf("["),getResourceUsageSwitch(job).indexOf("]")).toString() + "]";
285                String Res = "\\\"( " + lsfResReqDef.makeString(job).replaceAll("\\\"", "").concat(" ) ").concat(SD).concat("\\\"");
286                classAd.print(" (xlsfresources = ");
287                classAd.print(Res);
288                classAd.print(")");
289            }
290            else if(getResourceUsageSwitch(job) != null){
291                classAd.print(" (xlsfresources = ");
292                classAd.print(getResourceUsageSwitch(job));
293                classAd.print(")");
294            }
295            else if( lsfResReqDef.hasResourcesDefinition(job)){
296                classAd.print(" (xlsfresources = ");
297                classAd.print(lsfResReqDef.makeString(job));
298                classAd.print(")");
299            }
300            
301    
302    //        if (getResourceUsageSwitch(job) != null) {
303    //            classAd.print(" (xlsfresources = ");
304    //            classAd.print(getResourceUsageSwitch(job));
305    //            classAd.print(")");
306    //        }
307    
308            if (job.getQueue() != null) {
309                classAd.print(" (queue = ");
310                classAd.print(job.getQueue());
311                classAd.print(")");
312            }
313    
314            classAd.println();
315    
316            if (isTransferExecutable()) {
317                classAd.println("transfer_executable = true");
318            } else {
319                classAd.println("transfer_executable = false");
320            }
321            classAd.println("notification = never");
322            classAd.println("universe = globus");
323            classAd.println("queue");
324        }
325    
326        private String getExecutable() {
327            if (application.getCommandLine().indexOf(' ') == -1) {
328                return application.getCommandLine();
329            }
330    
331            return application.getCommandLine().substring(0,
332                application.getCommandLine().indexOf(' '));
333        }
334    
335        private String getArguments() {
336            if (application.getCommandLine().indexOf(' ') == -1) {
337                return null;
338            }
339    
340            return application.getCommandLine().substring(application.getCommandLine().indexOf(' ') + 1);
341        }
342    
343        private String getLogName(Job job) {
344            // TODO maybe log filename should be put as a general property of Process (as stds)
345            return "sched" + job.getJobID() + ".condorg.log";
346        }
347    
348        private String getGlobusScheduler() {
349            //TODO make it flexible
350            return getGlobusGatekeeper();
351        }
352        
353        private String gatekeeper;
354        
355        /** Holds value of property transferExecutable. */
356        private boolean transferExecutable;
357        
358        public void setGlobusGatekeeper(String gatekeeper) {
359            this.gatekeeper = gatekeeper;
360        }
361        
362        public String getGlobusGatekeeper() {
363            return gatekeeper;
364        }
365    
366        private String remoteInitialDir;
367        
368        public void setRemoteInitialDir(String remoteInitialDir) {
369            this.remoteInitialDir = remoteInitialDir;
370        }
371        
372        public String getRemoteInitialDir() {
373            return remoteInitialDir;
374        }
375        
376        private String getRemoteDirectory() {
377            // TODO this has to be specified better: remote execution directory could be different from scheduler execution directory
378            if (".".equals(getRemoteInitialDir())) return FilesystemToolkit.getCurrentDirectory();
379            return getRemoteInitialDir();
380        }
381    
382        protected String getResourceUsageSwitch(Job job) {
383            String res = super.getResourceUsageSwitch(job);
384            if (res == null) return res;
385    
386            return res.replaceAll("\"", "\\\\\"");
387        }
388        
389        /** Getter for property transferExecutable.
390         * @return Value of property transferExecutable.
391         *
392         */
393        public boolean isTransferExecutable() {
394            return this.transferExecutable;
395        }
396        
397        /** Setter for property transferExecutable.
398         * @param transferExecutable New value of property transferExecutable.
399         *
400         */
401        public void setTransferExecutable(boolean transferExecutable) {
402            this.transferExecutable = transferExecutable;
403        }
404        
405        
406        
407       public void Kill(Request request, List jobs) {
408                //System.out.println("condor kill");
409            
410             for(int z=0; z != jobs.size(); z++){
411                Job job = (Job)jobs.get(z);
412            
413                if(job.getProcesseIDs().size() == 0){
414                    System.out.println("No ProcesseIDs found for job " + job.getJobID());
415                    jobs.remove(z);
416                    z--;
417                }
418                else{
419                    for(int i=0; job.getProcesseIDs().size() != i; i++){
420    
421                        int attempt = 0;
422                        boolean success = false;
423                        String commmandOutput = "";
424                        System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">");
425    
426                        while (!success && (attempt < getMaxAttempts())) {
427                                try {
428                                   CSHCommandLineTask task = new CSHCommandLineTask("condor_rm " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime());
429                                    task.execute();
430                                    if (task.getExitStatus() != 0) {
431                                        log.warning("condor_rm " + task.getOutput());
432                                        Thread.sleep(getMsBtwnFailure());
433                                        if(task.getOutput().lastIndexOf("Couldn't find") != -1) success = true;
434                                        System.out.print(task.getOutput());
435                                        attempt++;
436                                    } 
437                                    else{ 
438                                        success = true;
439                                        System.out.println("Killed");
440                                    }
441    
442                                    commmandOutput = task.getOutput();
443                                } 
444                                catch (Exception e) { System.out.print("condor_rm failed" + e); 
445                                System.out.print(commmandOutput);
446                                }
447                                try { Thread.sleep(getMsBtwnFailure());} 
448                                catch (Exception e1) {System.out.print("condor_rm failed");}
449                                    if(!success) System.out.print("/");
450                                    attempt++;
451                        }
452    
453                    }
454                    job.clearProcesseIDs();
455                    jobs.remove(z);
456                    z--;
457                }      
458           }
459        }    
460        
461        public String Status(Job job, int Processe) {
462                        if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID();
463                if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist.";
464                
465                    
466               // for(int i=0; job.getProcesseIDs().size() != i; i++){
467    
468                    int attempt = 0;
469                    boolean success = false;
470                    String commmandOutput = "";
471                    System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">");
472    
473                    while (!success && (attempt < getMaxAttempts())) {
474                            try {
475                               CSHCommandLineTask task = new CSHCommandLineTask("condor_q " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime());
476                                task.execute();
477                                if (task.getExitStatus() != 0) {
478                                    log.warning("condor_q " + task.getOutput());
479                                    Thread.sleep(getMsBtwnFailure());
480                                    
481                                   // if(task.getOutput().lastIndexOf("already finished") != -1) success = true;
482                                    //return (task.getOutput().replace('\n',' ');
483                                    attempt++;
484                                } 
485                                else{ 
486                                    success = true;
487    
488                                    
489                                    if(task.getOutput().length() < 217) return("Done or Killed");
490                                    else{
491                                        String state = task.getOutput().substring(214,216).trim();
492                                        if( state.startsWith("R")) state = "RUN";
493                                        return(task.getOutput().substring(214,216).trim());
494                                    }
495    
496                              
497                                }
498    
499                                commmandOutput = task.getOutput();
500                            } 
501                            catch (Exception e) { System.out.print("condor_q failed" + e); 
502                            System.out.print(commmandOutput);
503                            }
504                            try { Thread.sleep(getMsBtwnFailure());} 
505                            catch (Exception e1) {System.out.print("condor_q failed");}
506                                if(!success) System.out.print("/");
507                                attempt++;
508                    }
509    
510               // }
511                
512           return "condor_q failed";
513        }
514        
515        public void stop() {
516        }    
517        
518    }